package com.example.bankseckill.config;

import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.example.bankseckill.pojo.SecKillMessageConstants;
import com.example.bankseckill.pojo.SeckillmessageLog;
import com.example.bankseckill.service.ISeckillmessageLogService;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.Date;


/**
 * @Author: liyangjing
 * @Date: 2022/02/25/14:03
 * @Description:
 *
 * rabbitmq配置类topic模式
 */
@Configuration
public class RabbitMQDirectConfig {


    public static final org.slf4j.Logger Logger = LoggerFactory.getLogger(RabbitMQDirectConfig.class);

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private ISeckillmessageLogService iSeckillmessageLogService;
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * 消息确认回调,确认消息是否到达broker
         * data:消息唯一标识
         * ack：确认结果
         * cause：失败原因
         */
        rabbitTemplate.setConfirmCallback((data,ack,cause)->{
            String msgId = data.getId();
            if(msgId.equals("order")){
                Logger.info("当 前 时 间 ： {}，订单延迟队列消息已发送！",new Date());
            }
            System.out.println("=============="+msgId);
            if (ack){
                //消息确认成功
                Logger.info("{}=====>消息发送成功",msgId);
                //更新数据库中记录
                iSeckillmessageLogService.update(new UpdateWrapper<SeckillmessageLog>().set("status",1).eq("msgId",msgId));
            }else {
                Logger.info("{}=====>消息发送失败",msgId);
            }
        });
        /**
         * 消息失败回调，比如router不到queue时回调
         * msg:消息主题
         * repCode:响应码
         * repText:响应描述
         * exchange:交换机
         * routingKey:路由键
         */
        rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingKey)->{
            Logger.info("{}=====>消息发送到queue时失败",msg.getBody());
        });
        return rabbitTemplate;
    }


    @Bean
    public Queue queue(){
        return new Queue(SecKillMessageConstants.QUEUE_NAME,true);
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(SecKillMessageConstants.EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with(SecKillMessageConstants.ROUTING_KEY_NAME);
    }
}
